21 asyncio异步编程
你要同时请求10个API接口,用同步代码得一个一个等,总共要等10倍时间。用asyncio可以让它们"同时"发起请求,哪个先回来就先处理哪个,总时间只取决于最慢的那个请求。
asyncio是Python的异步I/O框架,用协程(coroutine)在单线程里实现并发。它不是多线程,也不是多进程,而是通过await在等待I/O时切换到其他任务,充分利用等待时间。
一、协程基础
1.1 async/await
python
import asyncio
# 用async def定义协程函数
async def hello():
print("Hello")
await asyncio.sleep(1) # 等待时可以切换到其他任务
print("World")
# 运行协程
asyncio.run(hello())async def定义的是协程函数,调用它不会立即执行,而是返回一个协程对象。必须用await或asyncio.run()来执行。
1.2 await
await用于等待一个可等待对象(协程、Task、Future)完成。
python
import asyncio
async def fetch_data():
print("开始获取数据...")
await asyncio.sleep(2) # 模拟网络请求
print("数据获取完成")
return {"name": "大志", "age": 28}
async def main():
# await会等待fetch_data完成
result = await fetch_data()
print(result)
asyncio.run(main())二、任务与并发
2.1 create_task()
创建任务,让多个协程并发执行。
python
import asyncio
async def fetch(url, delay):
print(f"开始请求 {url}")
await asyncio.sleep(delay)
print(f"完成请求 {url}")
return f"{url} 的数据"
async def main():
# 创建任务(立即开始执行)
task1 = asyncio.create_task(fetch("url1", 2))
task2 = asyncio.create_task(fetch("url2", 1))
task3 = asyncio.create_task(fetch("url3", 3))
# 等待所有任务完成
result1 = await task1
result2 = await task2
result3 = await task3
print(f"结果: {result1}, {result2}, {result3}")
asyncio.run(main())
# 总共只需3秒(取决于最慢的任务),而不是6秒2.2 gather()
更简洁的并发方式。
python
import asyncio
async def fetch(url, delay):
await asyncio.sleep(delay)
return f"{url} 的数据"
async def main():
# gather并发运行多个协程
results = await asyncio.gather(
fetch("url1", 2),
fetch("url2", 1),
fetch("url3", 3)
)
print(results)
# ['url1 的数据', 'url2 的数据', 'url3 的数据']
asyncio.run(main())2.3 gather()的异常处理
python
import asyncio
async def risky_task():
await asyncio.sleep(1)
raise ValueError("出错了")
async def safe_task():
await asyncio.sleep(1)
return "成功"
async def main():
# return_exceptions=True:异常作为结果返回,不会中断其他任务
results = await asyncio.gather(
risky_task(),
safe_task(),
return_exceptions=True
)
print(results)
# [ValueError('出错了'), '成功']
asyncio.run(main())三、等待控制
3.1 wait()
更灵活的等待方式。
python
import asyncio
async def fetch(url, delay):
await asyncio.sleep(delay)
return f"{url} 完成"
async def main():
tasks = [
asyncio.create_task(fetch("url1", 2)),
asyncio.create_task(fetch("url2", 1)),
asyncio.create_task(fetch("url3", 3)),
]
# FIRST_COMPLETED:有一个完成就返回
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print(f"已完成: {len(done)}, 未完成: {len(pending)}")
for task in done:
print(f"结果: {task.result()}")
asyncio.run(main())return_when参数:
FIRST_COMPLETED:有一个任务完成就返回FIRST_EXCEPTION:有一个任务抛异常就返回ALL_COMPLETED:所有任务完成才返回(默认)
3.2 as_completed()
按完成顺序获取结果。
python
import asyncio
async def fetch(url, delay):
await asyncio.sleep(delay)
return f"{url} 完成"
async def main():
tasks = [
asyncio.create_task(fetch("url1", 3)),
asyncio.create_task(fetch("url2", 1)),
asyncio.create_task(fetch("url3", 2)),
]
# 按完成顺序获取结果
for task in asyncio.as_completed(tasks):
result = await task
print(result)
asyncio.run(main())
# url2 完成
# url3 完成
# url1 完成四、异步同步原语
4.1 Lock
异步锁,保护共享资源。
python
import asyncio
shared_resource = 0
lock = asyncio.Lock()
async def increment():
global shared_resource
async with lock:
current = shared_resource
await asyncio.sleep(0.1) # 模拟处理
shared_resource = current + 1
async def main():
await asyncio.gather(*[increment() for _ in range(10)])
print(f"结果: {shared_resource}") # 10
asyncio.run(main())4.2 Event
异步事件,用于任务间通知。
python
import asyncio
event = asyncio.Event()
async def waiter():
print("等待事件...")
await event.wait()
print("事件触发了!")
async def setter():
await asyncio.sleep(2)
event.set() # 触发事件
async def main():
await asyncio.gather(waiter(), setter())
asyncio.run(main())4.3 Semaphore
信号量,限制并发数量。
python
import asyncio
semaphore = asyncio.Semaphore(3) # 最多3个并发
async def limited_task(id):
async with semaphore:
print(f"任务 {id} 开始")
await asyncio.sleep(2)
print(f"任务 {id} 完成")
async def main():
await asyncio.gather(*[limited_task(i) for i in range(10)])
asyncio.run(main())
# 同时只有3个任务在执行4.4 Queue
异步队列,用于生产者-消费者模式。
python
import asyncio
async def producer(queue):
for i in range(5):
await asyncio.sleep(1)
await queue.put(f"数据 {i}")
print(f"生产: 数据 {i}")
async def consumer(queue):
while True:
item = await queue.get()
print(f"消费: {item}")
queue.task_done()
async def main():
queue = asyncio.Queue()
# 启动生产者和消费者
await asyncio.gather(
producer(queue),
consumer(queue),
return_exceptions=True
)
asyncio.run(main())五、实战场景
5.1 批量请求API
python
import asyncio
import aiohttp # 需要安装: pip install aiohttp
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.json()
async def main():
urls = [
"https://api.example.com/data1",
"https://api.example.com/data2",
"https://api.example.com/data3",
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())5.2 超时控制
python
import asyncio
async def slow_task():
await asyncio.sleep(10)
return "完成"
async def main():
try:
# 设置超时时间
result = await asyncio.wait_for(slow_task(), timeout=3)
print(result)
except asyncio.TimeoutError:
print("任务超时!")
asyncio.run(main())5.3 定时任务
python
import asyncio
async def periodic_task():
while True:
print("执行定时任务...")
await asyncio.sleep(5) # 每5秒执行一次
async def main():
task = asyncio.create_task(periodic_task())
# 运行30秒后取消
await asyncio.sleep(30)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("任务已取消")
asyncio.run(main())六、常见错误
6.1 忘记await
python
# 错误:忘记await
async def main():
task = asyncio.create_task(some_coroutine())
result = task # 这是Task对象,不是结果
# 正确
async def main():
task = asyncio.create_task(some_coroutine())
result = await task # 这才是结果6.2 在同步代码中调用协程
python
# 错误:在同步函数中直接调用协程
def sync_function():
result = await some_coroutine() # SyntaxError
# 正确:用asyncio.run()
def sync_function():
result = asyncio.run(some_coroutine())七、总结
asyncio的核心:
| 组件 | 用途 |
|---|---|
async def | 定义协程函数 |
await | 等待协程完成 |
asyncio.run() | 运行主协程 |
asyncio.create_task() | 创建任务 |
asyncio.gather() | 并发运行多个协程 |
asyncio.wait() | 灵活等待任务 |
asyncio.Lock/Event/Semaphore | 异步同步原语 |
asyncio.Queue | 异步队列 |
使用场景:
- 网络请求(配合aiohttp)
- 数据库查询(配合asyncpg等)
- 文件I/O
- 任何需要"等待"的操作
记住:asyncio适合I/O密集型任务,CPU密集型任务用multiprocessing。